package me.info.sboot.app.cfg;

import java.util.Arrays;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClient.FailureListener;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.sniff.SniffOnFailureListener;
import org.elasticsearch.client.sniff.Sniffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import lombok.extern.slf4j.Slf4j;
import me.info.sboot.app.cfg.prop.EsProp;

/**
 * @author Amber
 * @version v0.0.1
 * @desc
 * @date 2020/12/19 13:53
 */
@Slf4j
@Configuration
public class EsCfg {

  private static final String HTTP_SCHEME = "http";

  final EsProp esProp;

  @Value("${es.nodes}")
  String[] esNodes;

  public EsCfg(EsProp esProp) {
    this.esProp = esProp;
  }

  @Bean
  public RestClientBuilder restClientBuilder() {
    HttpHost[] hosts =
        Arrays.stream(esNodes)
            .map(this::makeHttpHost)
            .filter(Objects::nonNull)
            .toArray(HttpHost[]::new);
    log.debug("es hosts {}", Arrays.toString(hosts));

    RestClientBuilder builder = RestClient.builder(hosts);

    addCredential(builder);

    builder
        .setFailureListener(
            new FailureListener() {
              @Override
              public void onFailure(Node node) {
                super.onFailure(node);
                log.warn("ES node failed: {}", node.getName());
              }
            })
        .setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS)
        .setRequestConfigCallback(
            requestConfigBuilder ->
                requestConfigBuilder
                    .setConnectTimeout(esProp.getConnectTimeout())
                    .setSocketTimeout(esProp.getSocketTimeout())
                    .setConnectionRequestTimeout(esProp.getConnectRequestTimeout()))
        .setHttpClientConfigCallback(
            httpAsyncClientBuilder ->
                httpAsyncClientBuilder
                    .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build())
                    .setMaxConnTotal(esProp.getMaxConnTotal())
                    .setMaxConnPerRoute(esProp.getMaxConnPreRoute())
                    .disableAuthCaching());

    return builder;
  }

  @Scope("singleton")
  @Bean(destroyMethod = "close")
  public RestClient restClient(@Autowired RestClientBuilder restClientBuilder) {
    // Usage sniffer ?
    SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();

    RestClient restClient = restClientBuilder.setFailureListener(sniffOnFailureListener).build();
    //

    Sniffer sniffer = Sniffer.builder(restClient).setSniffIntervalMillis(60000).build();
    sniffOnFailureListener.setSniffer(sniffer);

    return restClient;
  }

  @Scope("singleton")
  @Bean
  public RestHighLevelClient rhlClient(@Autowired RestClientBuilder restClientBuilder) {
    // restClientBuilder.setMaxRetryTimeoutMillis(60000);
    return new RestHighLevelClient(restClientBuilder);
  }

  private void addCredential(RestClientBuilder builder) {
    if (StringUtils.isEmpty(esProp.getUsername())) {
      return;
    }
    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(
        AuthScope.ANY, new UsernamePasswordCredentials(esProp.getUsername(), esProp.getPassword()));

    /*配置通信加密，有多种方式：setSSLContext、setSSLSessionStrategy和setConnectionManager(它们的重要性逐渐递增) */
    /*
    KeyStore truststore = KeyStore.getInstance("jks");
    try (InputStream is = Files.newInputStream(keyStorePath)) {
      truststore.load(is, keyStorePass.toCharArray());
    }
    SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
    final SSLContext sslContext = sslBuilder.build();*/

    builder.setHttpClientConfigCallback(
        httpClientBuilder ->
            httpClientBuilder /*.setSSLContext(sslContext)*/.setDefaultCredentialsProvider(
                credentialsProvider)); // ?
  }

  private HttpHost makeHttpHost(String s) {
    assert StringUtils.isNoneEmpty(esNodes);
    String[] address = s.split(":");
    if (address.length == 2) {
      String ip = address[0];
      int port = Integer.parseInt(address[1]);
      return new HttpHost(ip, port, HTTP_SCHEME);
    } else {
      return null;
    }
  }
  /*@Primary
  @Bean
  RestHighLevelClient restHighLevelClient() {
    String[] nodes = esProp.getNodes().toArray(new String[0]);
    ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(nodes).build();
    return RestClients.create(clientConfiguration).rest();

  }*/

}
